
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
//import org.apache.flink.table.functions.UserDefinedAggregateFunction;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.call;
import java.util.Arrays;
import static org.apache.flink.table.api.Expressions.$;

public class FlatAggregate
{

    public static void main(String[] args) throws Exception

    {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        DataStream<Order> ds1 = env.fromCollection(Arrays.asList(
                new Order(1L, "beer", 3),
                new Order(3L, "rubber", 2),
                new Order(1L, "diaper", 4)
        ));


//
//        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);


//        DataSet<Order> ds1 = env.fromCollection(Arrays.asList(
//                new Order(1L, "beer", 3),
//                new Order(3L, "rubber", 2),
//                new Order(1L, "diaper", 4)
//        ));


//        Top2 func = new Top2();
//        tEnv.registerFunction("top2",func);

        TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> func = new Top2();
        tEnv.registerFunction("top2",new Top2());


        tEnv.createTemporaryView("Orders",ds1);

        Table orders = tEnv.from("Orders");
        Table result = orders
            .groupBy($("amount"))
            .flatAggregate(call("top2", $("amount")).as("first", "second"))
            .select($("amount"), $("first"), $("second"));

        tEnv.toRetractStream(result, Row.class).print();
        env.execute();



    }

}
